Library Imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
Template
spark = (
SparkSession.builder
.master("local")
.appName("Exploring Joins")
.config("spark.some.config.option", "some-value")
.getOrCreate()
)
sc = spark.sparkContext
Initial Datasets
pets = spark.createDataFrame(
[
(1, 1, 'Bear'),
(2, 1, 'Chewie'),
(3, 2, 'Roger'),
], ['id', 'breed_id', 'nickname']
)
pets.toPandas()
id | breed_id | nickname | |
---|---|---|---|
0 | 1 | 1 | Bear |
1 | 2 | 1 | Chewie |
2 | 3 | 2 | Roger |
breeds = spark.createDataFrame(
[
(1, 'Pitbull', 10),
(2, 'Corgie', 20),
], ['breed_id', 'name', 'average_height']
)
breeds.toPandas()
breed_id | name | average_height | |
---|---|---|---|
0 | 1 | Pitbull | 10 |
1 | 2 | Corgie | 20 |
Filter Pushdown
Filter pushdown
improves performance by reducing the amount of data shuffled during any dataframes transformations.
Depending on your filter logic and where you place your filter code. Your Spark code will behave differently.
Case #1: Filtering on Only One Side of the Join
df = (
pets
.join(breeds, 'breed_id', 'left_outer')
.filter(F.col('nickname') == 'Chewie')
)
df.toPandas()
breed_id | id | nickname | name | average_height | |
---|---|---|---|---|---|
0 | 1 | 2 | Chewie | Pitbull | 10 |
df.explain()
== Physical Plan ==
*(4) Project [breed_id#1L, id#0L, nickname#2, name#7, average_height#8L]
+- SortMergeJoin [breed_id#1L], [breed_id#6L], LeftOuter
:- *(2) Sort [breed_id#1L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(breed_id#1L, 200)
: +- *(1) Filter (isnotnull(nickname#2) && (nickname#2 = Chewie))
: +- Scan ExistingRDD[id#0L,breed_id#1L,nickname#2]
+- *(3) Sort [breed_id#6L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(breed_id#6L, 200)
+- Scan ExistingRDD[breed_id#6L,name#7,average_height#8L]
What Happened:
Because the column nickname
is only present in the left
side of the join, only the left
side of the join was filtered
before the join.
Case #2: Filter on Both Sides of the Join
df = (
pets
.join(breeds, 'breed_id', 'left_outer')
.filter(F.col('breed_id') == 1)
)
df.toPandas()
breed_id | id | nickname | name | average_height | |
---|---|---|---|---|---|
0 | 1 | 1 | Bear | Pitbull | 10 |
1 | 1 | 2 | Chewie | Pitbull | 10 |
df.explain()
== Physical Plan ==
*(4) Project [breed_id#1L, id#0L, nickname#2, name#7, average_height#8L]
+- SortMergeJoin [breed_id#1L], [breed_id#6L], LeftOuter
:- *(2) Sort [breed_id#1L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(breed_id#1L, 200)
: +- *(1) Filter (isnotnull(breed_id#1L) && (breed_id#1L = 1))
: +- Scan ExistingRDD[id#0L,breed_id#1L,nickname#2]
+- *(3) Sort [breed_id#6L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(breed_id#6L, 200)
+- Scan ExistingRDD[breed_id#6L,name#7,average_height#8L]
What Happened:
The column breed_id
is present in both
sides of the join, but only the left
side was filtered
before the join.
Case #3: Filter on Both Sides of the Join #2
df = (
pets
.join(breeds, 'breed_id')
.filter(F.col('breed_id') == 1)
)
df.toPandas()
breed_id | id | nickname | name | average_height | |
---|---|---|---|---|---|
0 | 1 | 1 | Bear | Pitbull | 10 |
1 | 1 | 2 | Chewie | Pitbull | 10 |
df.explain()
== Physical Plan ==
*(5) Project [breed_id#1L, id#0L, nickname#2, name#7, average_height#8L]
+- *(5) SortMergeJoin [breed_id#1L], [breed_id#6L], Inner
:- *(2) Sort [breed_id#1L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(breed_id#1L, 200)
: +- *(1) Filter (isnotnull(breed_id#1L) && (breed_id#1L = 1))
: +- Scan ExistingRDD[id#0L,breed_id#1L,nickname#2]
+- *(4) Sort [breed_id#6L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(breed_id#6L, 200)
+- *(3) Filter (isnotnull(breed_id#6L) && (breed_id#6L = 1))
+- Scan ExistingRDD[breed_id#6L,name#7,average_height#8L]
What Happened:
The column breed_id
is present in both
sides of the join, and spark was able to figure out that it should perform a filter
on both sides before the join.
Case #4: Filter on Both Sides of the Join, Filter Beforehand
df = (
pets
.join(
breeds.filter(F.col('breed_id') == 1),
'breed_id',
'left_outer'
)
.filter(F.col('breed_id') == 1)
)
df.toPandas()
breed_id | id | nickname | name | average_height | |
---|---|---|---|---|---|
0 | 1 | 1 | Bear | Pitbull | 10 |
1 | 1 | 2 | Chewie | Pitbull | 10 |
df.explain()
== Physical Plan ==
*(5) Project [breed_id#1L, id#0L, nickname#2, name#7, average_height#8L]
+- SortMergeJoin [breed_id#1L], [breed_id#6L], LeftOuter
:- *(2) Sort [breed_id#1L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(breed_id#1L, 200)
: +- *(1) Filter (isnotnull(breed_id#1L) && (breed_id#1L = 1))
: +- Scan ExistingRDD[id#0L,breed_id#1L,nickname#2]
+- *(4) Sort [breed_id#6L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(breed_id#6L, 200)
+- *(3) Filter (isnotnull(breed_id#6L) && (breed_id#6L = 1))
+- Scan ExistingRDD[breed_id#6L,name#7,average_height#8L]
What Happened:
The column breed_id
is present in both
sides of the join, and both sides were filtered
before the join.
Summary
- To improve join performance, we should always try to push the
filter
before the joins. - Spark might be smart enough to figure that the
filter
can be performed on both sides, but not always. - You should alway check to see if your Spark DAG is performant during a join and if any
filter
s can be pushed before the joins.